package com.boat.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Description: TODO(用一句话描述该文件做什么)
 * @author boat
 * @date 2018年1月10日 下午6:15:54
 * @version V1.0
 */

@Configuration
public class KafkaConfig {

	@Value("${kafka.producer.servers}")
	private String servers;
	
	@Value("${kafka.producer.retries}")
	private int retries;
	
	@Value("${kafka.producer.batch.size}")
	private int batchSize;
	
	/**
	 * 汇总请求，减少请求数目，增大延迟
	 * 记录产生速度大于发送速度的时候才能发生/或降低负载
	 */
	@Value("${kafka.producer.linger}")
	private int linger;
	
	/**
	 * 记录产生速度大于发送速度
	 * 设置缓存内存大小
	 */
	@Value("${kafka.producer.buffer.memory}")
	private int bufferMemory;
	
	@Value("${kafka.producer.key.serializer}")
	private String keySerializer;
	
	@Value("${kafka.producer.value.serializer}")
	private String valueSerializer;
	
	/**
	 * leader已经成功将数据写入本地log
	 * 如果follower没有成功备份数据，而此时leader又挂掉，则消息会丢失
	 */
	@Value("${kafka.producer.acks}")
	private String acks;

	@Bean
	public Producer<String, String> getProduce() {
		Map<String, Object> configs = new HashMap<String, Object>();
		configs.put("bootstrap.servers", servers);
		configs.put("retries", retries);
		configs.put("batch.size", batchSize);
		configs.put("linger.ms", linger);
		configs.put("buffer.memory", bufferMemory);
		configs.put("key.serializer", keySerializer);
		configs.put("value.serializer", valueSerializer);
		configs.put("acks", acks);
		Producer<String, String> producer = new KafkaProducer<String, String>(configs);
		return producer;
	}
}
